package com.jourwon.spring.boot.util;

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.jourwon.spring.boot.dto.EsDoc;
import com.jourwon.spring.boot.dto.EsScrollResult;
import com.jourwon.spring.boot.query.QueryPageInfo;
import com.jourwon.spring.boot.query.QuerySearchAfter;
import com.jourwon.spring.boot.respsonse.ResponsePageInfo;
import com.jourwon.spring.boot.respsonse.ResponseSearchAfter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * Easticsearch 工具类
 * 注：如果需要将查询结果映射到实体类，可以使用hutool工具类的BeanUtil.setFieldValue(t, idField, getResponse.getId());方法
 *
 * @author JourWon
 * @date 2021/12/31
 */
@Slf4j
@Component
public class EsUtils {

    private static final String COMMA = ",";

    @Resource
    private RestHighLevelClient restHighLevelClient;

    private void assertIndex(String index) {
        Assert.hasText(index, "索引不能为空");
    }

    private void assertIndexAndId(String index, String id) {
        Assert.hasText(index, "索引不能为空");
        Assert.hasText(id, "文档ID不能为空");
    }

    private void assertDocument(String index, Object doc) {
        Assert.hasText(index, "索引不能为空");
        Assert.notNull(doc, "文档内容不能为空");
    }

    private void assertIndexAndList(String index, List<?> documentList) {
        Assert.hasText(index, "索引不能为空");
        Assert.notEmpty(documentList, "文档列表不能为空");
    }

    private void assertDocument(String index, String id, Object doc) {
        Assert.hasText(index, "索引不能为空");
        Assert.hasText(id, "文档ID不能为空");
        Assert.notNull(doc, "文档内容不能为空");
    }

    // ======================== Index 索引 ========================

    /**
     * 判断索引是否存在
     *
     * @param index 索引
     * @return boolean true:存在, false:不存在
     */
    public boolean existsIndex(String index) {
        assertIndex(index);

        GetIndexRequest request = new GetIndexRequest(index);
        // 主节点检索信息
        request.local(false);
        // 返回结果为适合人类的格式
        request.humanReadable(true);
        // 返回每个索引的所有默认设置
        request.includeDefaults(false);

        try {
            return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> existsIndex method exception,index:{},error message:{}", index, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 创建索引
     *
     * @param index 索引
     * @return boolean true:成功, false:失败
     */
    public boolean createIndex(String index) {
        assertIndex(index);

        if (existsIndex(index)) {
            log.warn("Index:[{}] is exits!", index);
            return true;
        }
        // 创建索引请求
        CreateIndexRequest request = new CreateIndexRequest(index);
        try {
            // 执行客户端请求
            CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            return response.isAcknowledged();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> createIndex method exception,index:{},error message:{}", index, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 创建索引
     *
     * @param index            索引
     * @param properties       文档属性集合
     * @param numberOfShards   分片数
     * @param numberOfReplicas 副本数
     * @return boolean true:成功, false:失败
     */
    public boolean createIndex(String index, Map<String, Map<String, Object>> properties, Integer numberOfShards, Integer numberOfReplicas) {
        assertIndex(index);

        if (existsIndex(index)) {
            log.warn("Index:[{}] is exits!", index);
            return true;
        }
        try {
            XContentBuilder builder = XContentFactory.jsonBuilder();
            // 注: ES 7.x 后的版本中,已经弃用 type
            builder.startObject()
                    .startObject("mappings")
                    .field("properties", properties)
                    .endObject()
                    .startObject("settings")
                    // 分片数
                    .field("number_of_shards", numberOfShards)
                    // 副本数
                    .field("number_of_replicas", numberOfReplicas)
                    .endObject()
                    .endObject();
            // 创建索引请求
            CreateIndexRequest request = new CreateIndexRequest(index).source(builder);
            // 执行客户端请求
            CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            return response.isAcknowledged();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> createIndex method exception,index:{},properties:{},error message:{}", index, properties, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 删除索引
     *
     * @param index 索引
     * @return boolean true:成功, false:失败
     */
    public boolean deleteIndex(String index) {
        assertIndex(index);

        if (!existsIndex(index)) {
            log.warn("Index:[{}] is not exits!", index);
            return true;
        }
        DeleteIndexRequest request = new DeleteIndexRequest(index);
        try {
            AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
            return response.isAcknowledged();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> deleteIndex method exception,index:{},error message:{}", index, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 获取索引配置
     *
     * @param index 索引
     * @return GetSettingsResponse 返回索引配置信息
     */
    public GetSettingsResponse getSettings(String index) {
        assertIndex(index);

        if (!existsIndex(index)) {
            log.warn("Index:[{}] is not exits!", index);
            return null;
        }
        GetSettingsRequest request = new GetSettingsRequest().indices(index);
        try {
            return restHighLevelClient.indices().getSettings(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> getSettings method exception,index:{},error message:{}", index, e.getMessage(), e);
            return null;
        }
    }

    /**
     * 获取索引映射信息
     *
     * @param index 索引
     * @return GetMappingsResponse 返回索引映射信息
     */
    public GetMappingsResponse getMapping(String index) {
        assertIndex(index);

        if (!existsIndex(index)) {
            log.warn("Index:[{}] is not exits!", index);
            return null;
        }
        GetMappingsRequest request = new GetMappingsRequest().indices(index);
        try {
            return restHighLevelClient.indices().getMapping(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> getMapping method exception,index:{},error message:{}", index, e.getMessage(), e);
            return null;
        }
    }


    // ======================== Document 文档 ========================

    /**
     * 通过ID判断文档是否存在
     *
     * @param index 索引
     * @param id    文档id
     * @return boolean true:存在, false:不存在
     */
    public boolean exists(String index, String id) {
        assertIndexAndId(index, id);

        GetRequest request = new GetRequest(index, id);
        // 禁用获取_source
        request.fetchSourceContext(new FetchSourceContext(false));
        // 禁用获取存储的字段
        request.storedFields("_none_");
        try {
            return restHighLevelClient.exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> existsDocument method exception,index:{},id:{},error message:{}", index, id, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 保存文档-随机生成文档ID
     *
     * @param index 索引
     * @param doc   文档
     * @return boolean true:成功, false:失败
     */
    public boolean insert(String index, Object doc) {
        return insert(index, null, doc);
    }

    /**
     * 保存文档-自定义文档ID
     * 如果文档存在,则更新文档;如果文档不存在,则保存文档
     *
     * @param index 索引
     * @param id    文档ID
     * @param doc   文档
     * @return boolean true:成功, false:失败
     */
    public boolean insert(String index, String id, Object doc) {
        assertDocument(index, doc);

        String docJson = JSON.toJSONString(doc);
        IndexRequest request = new IndexRequest(index);
        if (StringUtils.isNotBlank(id)) {
            request.id(id);
        }
        request.source(docJson, XContentType.JSON);
        try {
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
            return true;
        } catch (IOException e) {
            log.error("[Elasticsearch] >> insert method exception,index:{},id:{},error message:{}", index, id, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 批量-新增或保存文档
     * 如果集合中有些文档已经存在,则更新文档;不存在,则保存文档
     *
     * @param index        索引
     * @param documentList 文档集合
     * @return boolean true:成功, false:失败
     */
    public boolean batchInsertOrUpdate(String index, List<EsDoc<?>> documentList) {
        assertIndexAndList(index, documentList);

        // 批量请求
        BulkRequest bulkRequest = new BulkRequest();
        documentList.forEach(doc -> bulkRequest.add(
                new IndexRequest(index).id(doc.getId())
                        .source(JSON.toJSONString(doc.getData()), XContentType.JSON)));
        try {
            return !restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT).hasFailures();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> batchInsertOrUpdateDocument method exception,index:{},error message:{}", index, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据ID删除文档
     *
     * @param index 索引
     * @param id    文档ID
     * @return boolean true:成功, false:失败
     */
    public boolean deleteById(String index, String id) {
        assertIndexAndId(index, id);

        DeleteRequest request = new DeleteRequest(index, id);
        // request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try {
            DeleteResponse response = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
            // 未找到文档
            if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
                log.warn("[Elasticsearch] >> deleteById document is not found, index:{},id:{}", index, id);
                return true;
            }
            return RestStatus.OK.equals(response.status());
        } catch (IOException e) {
            log.error("[Elasticsearch] >> deleteById method exception,index:{},id:{},error message:{}", index, id, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据查询条件删除文档
     *
     * @param index        索引
     * @param queryBuilder 查询条件构建器
     * @return boolean true:成功, false:失败
     */
    public boolean deleteByQuery(String index, QueryBuilder queryBuilder) {
        assertIndex(index);

        DeleteByQueryRequest request = new DeleteByQueryRequest(index).setQuery(queryBuilder);
        // 设置版本冲突时继续
        // request.setAbortOnVersionConflict(false);
        // 删除文档之后调用索引刷新
        // request.setRefresh(true);
        try {
            restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
            return true;
        } catch (IOException e) {
            log.error("[Elasticsearch] >> deleteByQuery method exception,index:{},error message:{}", index, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据文档 ID 批量删除文档
     *
     * @param index  索引
     * @param idList 文档 ID 集合
     * @return boolean true:成功, false:失败
     */
    public boolean deleteByIdList(String index, List<String> idList) {
        assertIndexAndList(index, idList);

        BulkRequest bulkRequest = new BulkRequest();
        idList.forEach(id -> bulkRequest.add(new DeleteRequest(index, id)));
        try {
            return !restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT).hasFailures();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> deleteByIdList method exception,index:{},error message:{}", index, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据ID修改文档
     *
     * @param index 索引
     * @param id    文档ID
     * @param doc   文档
     * @return boolean true:成功, false:失败
     */
    public boolean updateById(String index, String id, Object doc) {
        assertDocument(index, id, doc);

        String docJson = JSON.toJSONString(doc);
        UpdateRequest request = new UpdateRequest(index, id);
        request.doc(docJson, XContentType.JSON);
        try {
            restHighLevelClient.update(request, RequestOptions.DEFAULT);
            return true;
        } catch (IOException e) {
            log.error("[Elasticsearch] >> updateById method exception,index:{},id:{},error message:{}", index, id, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据ID修改文档
     * 注:
     * 1).可变更已有字段值,可新增字段
     * 2).若当前ID文档不存在则新增文档
     *
     * @param index 索引
     * @param id    文档ID
     * @param doc   文档
     * @return boolean true:成功, false:失败
     */
    public boolean updateOrInsertById(String index, String id, Object doc) {
        assertDocument(index, id, doc);

        String docJson = JSON.toJSONString(doc);
        UpdateRequest request = new UpdateRequest(index, id);
        request.doc(docJson, XContentType.JSON);
        request.docAsUpsert(true);
        try {
            restHighLevelClient.update(request, RequestOptions.DEFAULT);
            return true;
        } catch (IOException e) {
            log.error("[Elasticsearch] >> updateOrInsertById method exception,index:{},id:{},error message:{}", index, id, e.getMessage(), e);
            return false;
        }
    }

    /**
     * 根据条件和脚本修改文档
     *
     * @param index        索引
     * @param queryBuilder 条件
     * @param script       脚本
     * @return boolean true:成功, false:失败
     */
    public boolean updateByQuery(String index, QueryBuilder queryBuilder, Script script) {
        if (StringUtils.isNotEmpty(index) && Objects.nonNull(queryBuilder) && Objects.nonNull(script)) {
            UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
            updateByQueryRequest.setQuery(queryBuilder);
            updateByQueryRequest.setScript(script);
            try {
                this.restHighLevelClient.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
            } catch (IOException e) {
                log.error("[Elasticsearch] >> updateByQuery method exception,index:{},error message:{}", index, e.getMessage(), e);
                return false;
            }
        }
        return false;
    }

    /**
     * 根据id查询
     *
     * @param index 索引
     * @param id    文档ID
     * @return T
     */
    public <T> T getByIdContainEsDocId(String index, String id, Class<T> clazz) {
        GetResponse getResponse = this.getById(index, id);
        if (Objects.isNull(getResponse)) {
            return null;
        }
        return EsFieldUtils.getContainEsDocId(getResponse, clazz);
    }

    /**
     * 根据id查询
     *
     * @param index 索引
     * @param id    文档ID
     * @return T
     */
    public <T> T getById(String index, String id, Class<T> clazz) {
        GetResponse getResponse = this.getById(index, id);
        if (Objects.isNull(getResponse)) {
            return null;
        }
        return JSON.parseObject(getResponse.getSourceAsString(), clazz);
    }

    /**
     * 根据id查询
     *
     * @param index 索引
     * @param id    文档ID
     * @return GetResponse
     */
    private GetResponse getById(String index, String id) {
        assertIndexAndId(index, id);

        GetRequest request = new GetRequest(index, id);
        try {
            return restHighLevelClient.get(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> getById method exception,index:{},id:{},error message:{}", index, id, e.getMessage(), e);
            return null;
        }
    }

    /**
     * 根据id集-批量获取文档
     * 因为需要转换为指定类型,获取的类型需要是相同类型,索引指定的文档ID集对应的文档应该是相同类型的
     * 注意:文档ID不存在的会被忽略
     *
     * @param index  索引
     * @param idList 文档ID集
     * @return T List<T>
     */
    public <T> List<T> getByIdList(String index, List<String> idList, Class<T> clazz) {
        MultiGetItemResponse[] responses = this.getByIdList(index, idList);
        if (null == responses || responses.length == 0) {
            return Collections.emptyList();
        }

        List<T> resultList = new ArrayList<>(responses.length);
        for (MultiGetItemResponse response : responses) {
            GetResponse getResponse = response.getResponse();
            if (!getResponse.isExists()) {
                continue;
            }
            resultList.add(JSON.parseObject(getResponse.getSourceAsString(), clazz));
        }

        return resultList;
    }

    /**
     * 根据id集-批量获取文档
     *
     * @param index  索引
     * @param idList 文档ID集
     * @return MultiGetItemResponse[]
     */
    private MultiGetItemResponse[] getByIdList(String index, List<String> idList) {
        assertIndexAndList(index, idList);

        try {
            MultiGetRequest request = new MultiGetRequest();
            idList.forEach(id -> request.add(index, id));

            // 同步执行
            MultiGetResponse responses = restHighLevelClient.mget(request, RequestOptions.DEFAULT);
            return responses.getResponses();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> getByIdList method exception,index:{},error message:{}", index, e.getMessage(), e);
            return null;
        }
    }

    /**
     * 根据多条件查询--分页
     * 注:from-size -[ "浅"分页 ]
     *
     * @param index         索引
     * @param queryBuilder  条件
     * @param queryPageInfo 分页请求参数
     * @param clazz         T
     * @param <T>
     * @return ResponsePageInfo 分页响应参数
     */
    public <T> ResponsePageInfo<T> searchPage(String index, QueryBuilder queryBuilder, QueryPageInfo queryPageInfo, Class<T> clazz) {
        assertIndex(index);

        Integer pageNo = queryPageInfo.getPageNo();
        Integer pageSize = queryPageInfo.getPageSize();
        SearchSourceBuilder searchSourceBuilder = getSearchSourceBuilder(queryBuilder, queryPageInfo, pageNo, pageSize);

        // 构建查询请求
        SearchRequest searchRequest = new SearchRequest(index).source(searchSourceBuilder);
        try {
            // 获取返回值
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            if (null == hits || hits.length == 0) {
                return ResponsePageInfo.emptyPage(pageNo, pageSize);
            }

            long totalHits = response.getHits().getTotalHits().value;
            List<T> resultList = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                resultList.add(JSON.parseObject(hit.getSourceAsString(), clazz));
            }
            return ResponsePageInfo.ofPage(pageNo, pageSize, totalHits, resultList);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> searchPage method exception,index:{},error message:{}", index, e.getMessage(), e);
            return ResponsePageInfo.emptyPage(pageNo, pageSize);
        }
    }

    private SearchSourceBuilder getSearchSourceBuilder(QueryBuilder queryBuilder, QueryPageInfo queryPageInfo, Integer pageNo, Integer pageSize) {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.trackTotalHits(true);
        // from从0开始
        int from = Math.max(pageSize * (pageNo - 1), 0);
        searchSourceBuilder.from(from);
        searchSourceBuilder.size(pageSize);

        sort(searchSourceBuilder, queryPageInfo.getSortField(), queryPageInfo.getSortOrder());
        return searchSourceBuilder;
    }

    private void sort(SearchSourceBuilder sourceBuilder, String sortField, String sortOrder) {
        if (StringUtils.isBlank(sortField)) {
            return;
        }
        // 排序字段为空时，默认升序
        SortOrder order = StringUtils.isBlank(sortOrder) ? SortOrder.ASC : SortOrder.fromString(sortOrder);
        String missing = order == SortOrder.ASC ? "_first" : "_last";
        sourceBuilder.sort(SortBuilders.fieldSort(sortField).order(order).missing(missing));
    }

    /**
     * 条件查询
     *
     * @param index        索引
     * @param queryBuilder 条件查询构建器
     * @param <T>          数据类型
     * @return List<T> 列表
     */
    public <T> List<T> searchList(String index, QueryBuilder queryBuilder, Class<T> clazz) {
        return searchList(index, SearchSourceBuilder.searchSource().query(queryBuilder), clazz);
    }

    /**
     * 条件查询
     *
     * @param index         索引
     * @param sourceBuilder 条件查询构建器
     * @param <T>           数据类型
     * @return List<T> 列表
     */
    public <T> List<T> searchList(String index, SearchSourceBuilder sourceBuilder, Class<T> clazz) {
        assertIndex(index);

        try {
            // 构建查询请求
            SearchRequest searchRequest = new SearchRequest(index).source(sourceBuilder);
            // 获取返回值
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            if (null == hits || hits.length == 0) {
                return Collections.emptyList();
            }

            List<T> resultList = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                resultList.add(JSON.parseObject(hit.getSourceAsString(), clazz));
            }
            return resultList;
        } catch (IOException e) {
            log.error("[Elasticsearch] >> searchList method exception,index:{},error message:{}", index, e.getMessage(), e);
            return Collections.emptyList();
        }
    }

    /**
     * count查询
     *
     * @param index        索引
     * @param queryBuilder 条件查询构建器
     * @return long
     */
    public long count(String index, QueryBuilder queryBuilder) {
        assertIndex(index);

        CountRequest request = new CountRequest(index);
        request.query(queryBuilder);
        try {
            CountResponse response = this.restHighLevelClient.count(request, RequestOptions.DEFAULT);
            return response.getCount();
        } catch (IOException e) {
            log.error("[Elasticsearch] >> count method exception,index:{},error message:{}", index, e.getMessage(), e);
            return 0L;
        }
    }

    /**
     * scroll查询
     *
     * @param index            索引
     * @param scrollId         scrollId
     * @param keepAliveSeconds 保存时间，单位秒
     * @param queryBuilder     条件查询构建器
     * @param size             查询条数
     * @param clazz            数据类型
     * @param <T>
     * @return EsScrollResult
     */
    public <T> EsScrollResult<T> scroll(String index, String scrollId, Long keepAliveSeconds, QueryBuilder queryBuilder,
                                        Integer size, Class<T> clazz) {
        return scroll(index, scrollId, keepAliveSeconds, queryBuilder, size, null, null, clazz);
    }

    /**
     * scroll查询
     *
     * @param index            索引
     * @param scrollId         scrollId
     * @param keepAliveSeconds 保存时间，单位秒
     * @param queryBuilder     条件查询构建器
     * @param size             查询条数
     * @param includes         包含字段
     * @param excludes         排除字段
     * @param clazz            数据类型
     * @param <T>
     * @return EsScrollResult
     */
    public <T> EsScrollResult<T> scroll(String index, String scrollId, Long keepAliveSeconds, QueryBuilder queryBuilder,
                                        Integer size, String[] includes, String[] excludes, Class<T> clazz) {
        assertIndex(index);
        try {
            SearchResponse response = getScrollResponse(new String[]{index}, queryBuilder,
                    size, includes, excludes, scrollId, keepAliveSeconds);
            if (isNullSearchResponse(response)) {
                if (response != null) {
                    clearScroll(response.getScrollId());
                }
                log.warn("[Elasticsearch] >> scroll查询响应结果没有数据,index:{},scrollId:{}", index, scrollId);

                return EsScrollResult.ofEmpty(scrollId);
            }

            final String nextScrollId = response.getScrollId();
            final List<T> resultList = searchHitsToList(response.getHits().getHits(), clazz);

            return EsScrollResult.ofSuccess(scrollId, nextScrollId, resultList);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> scroll method exception,scrollId:{},error message:{}", scrollId, e.getMessage(), e);
            clearScroll(scrollId);
        }

        return EsScrollResult.ofEmpty(scrollId);
    }

    private boolean isNullSearchResponse(SearchResponse response) {
        return response == null || response.getHits() == null || response.getHits().getHits() == null
                || response.getHits().getHits().length == 0;
    }

    private <T> List<T> searchHitsToList(SearchHit[] hits, Class<T> clazz) {
        if (hits == null || hits.length == 0) {
            return Collections.emptyList();
        }

        return Arrays.stream(hits)
                .map(hit -> JSON.parseObject(hit.getSourceAsString(), clazz))
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    private void clearScroll(String scrollId) {
        if (StringUtils.isBlank(scrollId)) {
            return;
        }

        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        try {
            restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> clearScroll method exception,scrollId:{},error message:{}", scrollId, e.getMessage(), e);
        }
    }

    private SearchResponse getScrollResponse(String[] indexArray, QueryBuilder queryBuilder, Integer size,
                                             String[] includes, String[] excludes, String scrollId, Long keepAliveSeconds) throws IOException {
        SearchResponse searchResponse;
        Scroll scroll = new Scroll(TimeValue.timeValueSeconds(keepAliveSeconds));

        if (StringUtils.isEmpty(scrollId)) {
            SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder
                    .searchSource()
                    .query(queryBuilder)
                    .size(size)
                    .fetchSource(includes, excludes);

            SearchRequest searchRequest = new SearchRequest()
                    .indices(indexArray)
                    .scroll(scroll)
                    .source(searchSourceBuilder);
            searchResponse = this.restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        } else {
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
            scrollRequest.scroll(scroll);
            searchResponse = this.restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
        }

        return searchResponse;
    }

    /**
     * searchAfter查询
     *
     * @param index        索引
     * @param queryBuilder 条件查询构建器
     * @param qo           searchAfter请求参数
     * @param clazz        数据类型
     * @param <T>
     * @return ResponseSearchAfter
     */
    public <T> ResponseSearchAfter<T> searchAfter(String index, QueryBuilder queryBuilder, QuerySearchAfter qo, Class<T> clazz) {
        return searchAfter(index, queryBuilder, qo, null, null, clazz);
    }

    /**
     * searchAfter查询
     *
     * @param index        索引
     * @param queryBuilder 条件查询构建器
     * @param qo           searchAfter请求参数
     * @param includes     包含字段
     * @param excludes     排除字段
     * @param clazz        数据类型
     * @param <T>
     * @return ResponseSearchAfter
     */
    public <T> ResponseSearchAfter<T> searchAfter(String index, QueryBuilder queryBuilder, QuerySearchAfter qo,
                                                  String[] includes, String[] excludes, Class<T> clazz) {
        assertIndex(index);

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder)
                .trackTotalHits(qo.isTrackTotalHits())
                .size(qo.getSize());
        qo.getSortList().forEach(sortEntity -> sort(searchSourceBuilder, sortEntity.getSortField(), sortEntity.getSortOrder()));
        searchSourceBuilder.fetchSource(includes, excludes);
        if (StrUtil.isNotBlank(qo.getSortId())) {
            searchSourceBuilder.searchAfter(new String[]{qo.getSortId()});
        }

        SearchRequest request = new SearchRequest(new String[]{index}, searchSourceBuilder);

        try {
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            if (isNullSearchResponse(response)) {
                return ResponseSearchAfter.ofEmpty(qo);
            }
            Long totalHits = null;
            if (qo.isTrackTotalHits()) {
                totalHits = response.getHits().getTotalHits().value;
            }

            return toResponseSearchAfter(response.getHits().getHits(), qo, totalHits, clazz);
        } catch (IOException e) {
            log.error("[Elasticsearch] >> searchAfter method exception,index:{},qo:{},error message:{}", index, qo, e.getMessage(), e);
        }
        return ResponseSearchAfter.ofEmpty(qo);
    }

    private <T> ResponseSearchAfter<T> toResponseSearchAfter(SearchHit[] hits, QuerySearchAfter qo, Long totalHits, Class<T> clazz) {
        Object[] nextSortArray = hits[hits.length - 1].getSortValues();

        final String nextSortId = Arrays.stream(nextSortArray)
                .map(Object::toString)
                .collect(Collectors.joining(COMMA));

        return ResponseSearchAfter.ofSuccess(qo, nextSortId, searchHitsToList(hits, clazz), totalHits);
    }

}
